Fork me on GitHub

【Java多线程】JUC锁 04. AQS

AbstractQueuedSynchronizer

1. 前言

  • AQS是java中管理“锁”的抽象类,依赖于FIFO等待队列,锁的许多公共方法都是在这个类中实现
  • AQS锁的类别 – 分为“独占锁”和“共享锁”两种。
    • 独占锁 – 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。
    • 共享锁 – 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。
  • 使用AQS作为基础同步器,需要通过更改State状态重写tryAcquire, tryRelease, tryAcquireShared, tryReleaseShared, isHeldExclusively这些方法

2. 源码解析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = 7373984972572414691L;
// 头结点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
}

AQS基于FIFO队列,底层数据结构为双向链表,依据head,tail指针来调度锁的获取和释放。Condition queue 是单向链表,不是必须的,当使用Condition才有此单向链表;可能有多个Condition queue

2.2 内部类

  • Node类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 通过nextWaiter来区分线程是“独占锁”线程还是“共享锁”线程。如果是“独占锁”线程,则nextWaiter的值为EXCLUSIVE;如果是“共享锁”线程,则nextWaiter的值是SHARED。
Node nextWaiter;

// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}

// 无参构造函数
Node() { // Used to establish initial head or SHARED marker
}

// 构造函数
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 构造函数
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
  • ConditionObject
1
2
3
4
5
6
7
8
9
10
11
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1; //中断模式 - 可重新中断
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1; //中断模式 - 抛出异常
}

2.3 独占锁和共享锁

2.3.1 独占锁

具体见ReentrantLock获取锁流程

  • acquire() 获取锁

独占模式获取锁,忽略中断

1
2
3
4
5
6
// 获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) && //尝试获取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //加入CLH队列,在队列中阻塞获取
selfInterrupt();
}
  • addWaiter()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 加入等待队列尾节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 自旋加入CLH末尾,若CLH为空,则新建一个CLH表头
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

其中,compareAndSetHead和compareAndSetTail 通过CAS方法操作当前线程

1
2
3
4
5
6
7
8
//实际调用的UnSafe类
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
  • acquireQueued()

acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取上一个等待的线程
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //前继节点是否为头节点,公平性原则
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 判断“当前线程是否应该阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 状态为CANCELLED = 1,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 状态为“0”或者“共享锁-3”状态,则设置前继节点为SIGNAL状态。为-2 则在condition queue中
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 通过LockSupport的park()阻塞“当前线程”。
LockSupport.park(this);
// 返回线程的中断状态。
return Thread.interrupted();
}

线程被阻塞之后唤醒,一般有2种情况:

  1. unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
  2. 中断唤醒。其它线程通过interrupt()中断当前线程。
  • cancelAcquire()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 取消继续获取(资源)
private void cancelAcquire(Node node) {
if (node == null)
return;
// 设置node结点的thread为空
node.thread = null;

// 保存node的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点
node.prev = pred = pred.prev;

// 获取pred结点的下一个结点
Node predNext = pred.next;

// 设置node结点的状态为CANCELLED
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点
// 比较并设置pred结点的next节点为null
compareAndSetNext(pred, predNext, null);
} else { // node结点不为尾结点,或者比较设置不成功
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { // (pred结点不为头结点,并且pred结点的状态为SIGNAL)或者 pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空
// 保存结点的后继
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0
compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;
} else {
unparkSuccessor(node); // 释放node的前一个结点
}
node.next = node; // help GC
}
}
  • release() 释放锁
1
2
3
4
5
6
7
8
9
10
//尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//唤醒当前线程的后继线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) //设置状态为0, 可获取锁
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

2.3.2 共享锁

具体见ReentrantReadWriteLock.ReadLock

  • acquireShared() 获取共享锁
1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) //尝试获取共享锁
doAcquireShared(arg);
}
  • doAcquireShared()

doAcquireShared()的作用是在CLH队列中自旋获取共享锁。doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doAcquireShared(int arg) {
// 创建“当前线程”对应的Shared节点,并将该线程添加到CLH队列中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取“node”的前一节点
final Node p = node.predecessor();
// 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,
// 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • releaseShared()释放锁
1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试释放锁,在子类实现
doReleaseShared();
return true;
}
return false;
}
  • doAcquireShared

doAcquireShared()的作用是在CLH队列中自旋获取共享锁。doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doAcquireShared(int arg) {
// 创建“当前线程”对应的Shared节点,并将该线程添加到CLH队列中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取“node”的前一节点
final Node p = node.predecessor();
// 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,
// 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3. 参考

http://www.cnblogs.com/leesf456/p/5350186.html